#include "config.h"

#include <unistd.h>
#include <string.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBCHUNK

#include "configure.h"
#include "chunk_bh.h"
#include "chunk_ops.h"
#include "net_global.h"
#include "../replica/replica.h"
#include "../controller/md_proto.h"
#include "chunk_cleanup.h"
#include "chunk_proto.h"
#include "dbg.h"
#include "../controller/volume_proto_eclog.h"

/*
 * chunk ec layout by Gabe:
 *  e.g. ec(2+1):
 *  ec->m = 3
 *  ec->k = 2
 *  strip_block = 4K
 *
 *     ec->m = repnum = 3
 *  /----------------------\
 *      ec->k = 2
 *     /---------\
 *   rep[0]   rep[1]   rep[2]
 *  /      \ /      \ /      \
 *  -------- -------- --------  --------
 *  | abcd | | efgh | | xxxx |  strip
 *  -------- -------- --------  --------
 *  | ijkl | | mopq | | xxxx |  strip
 *  -------- -------- --------  --------
 *  | rstu | | vwxy | | xxxx |  strip
 *  -------- -------- --------  --------
 *  | z*** | | **** | | xxxx |  strip
 *  -------- -------- --------  --------
 *
 *  read: align to stip, read whole strips, pick up the data in strips
 *  write: align to stip, read whole strips, modify write data, encode ec code, write data to chunk.
 */

typedef struct {
        char *pool;
        fileid_t parent;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        vfm_t *vfm;
        int op;
        int flags;
        int tier;

        ec_t ec;
        clockstat_t *clockstat;
        clockstat_t *clocks;
        unsigned char *src_in_err;
} args_t;

STATIC int __ec_strips_encode(const ec_t *ec, uint32_t off, uint32_t count,
                buffer_t *strips);
STATIC int __ec_strips_decode(const ec_t *ec, uint32_t off, uint32_t count,
                buffer_t *strips, unsigned char *src_in_err);

static inline void args_init(args_t *args, const char *pool, const fileid_t *parent,
                             chkinfo_t *chkinfo, chkstat_t *chkstat, vfm_t *vfm,
                             int op, int tier, int flags, const ec_t *ec,
                             clockstat_t *clockstat, clockstat_t *clocks, unsigned char *src_in_err)
{

        args->pool = (char *)pool;
        args->parent = *parent;
        args->chkinfo = chkinfo;
        args->chkstat = chkstat;
        args->vfm = vfm;
        args->op = op;
        args->tier = tier;
        args->flags = flags;
        args->ec = *ec;
        args->clockstat = clockstat;
        args->clocks = clocks;
        args->src_in_err = src_in_err;
}

STATIC void __ec_strips_init(const io_t *io, const ec_t *ec,
                uint32_t *off, uint32_t *count, buffer_t *strips,
                int allocate, int initzero)
{
        int i;
        size_t strip_size;

        YASSERT(ec->m <= EC_MMAX);
        YASSERT(ec->k <= EC_KMAX);

        strip_size = ec->k * STRIP_BLOCK;
        *off = io->offset / strip_size;
        *count = _ceil(io->size + (io->offset % strip_size), strip_size);

        YASSERT(*off < STRIP_MAX);
        YASSERT(*count > 0 && *count <= STRIP_MAX);

        for (i = 0; i < EC_MMAX; i++) {
                if (initzero) {
                        mbuffer_init(&strips[i], 0);
                        mbuffer_appendzero(&strips[i], *count * STRIP_BLOCK);
                } else if (allocate)
                        mbuffer_init(&strips[i], *count * STRIP_BLOCK);
                else
                        mbuffer_init(&strips[i], 0);
        }
}

STATIC void __ec_strips_free(buffer_t *strips)
{
        int i;

        for (i = 0; i < EC_MMAX; i++) {
                mbuffer_free(&strips[i]);
        }
}

STATIC int __ec_replica_read(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                io_t *io, uint32_t i, buffer_t *strips)
{
        int ret;
        const reploc_t *tmp;
        const nid_t *nid;
        time_t ltime;

        tmp = &chkinfo->diskid[i];

        if (unlikely(chkstat->repstat[i].ltime == 0)) {
                DBUG("%s not online\n", network_rname(&tmp->id));
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        if (unlikely(tmp->status != 0)) {
                DBUG("%s status %u\n", network_rname(&tmp->id), tmp->status);
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        nid = &tmp->id;

        ret = network_connect(nid, &ltime, 1, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (net_islocal(nid)) {
                DBUG("read "CHKID_FORMAT" %u %llu clock:%ld local\n",
                     CHKID_ARG(&io->id), io->size, (LLU)io->offset, io->vclock.clock);
                ret = replica_srv_read(net_getnid(), io, &strips[i]);
                if (unlikely(ret)) {
                        if (ret == EPERM) {
                                ret = EAGAIN;
                                network_close(nid, "replica local read fail", &ltime);
                        }

                        GOTO(err_ret, ret);
                }
        } else {
                DBUG("read "CHKID_FORMAT" %u %llu clock %ld remote @ %s\n",
                     CHKID_ARG(&io->id), io->size, (LLU)io->offset, io->vclock.clock, network_rname(nid));
                ret = replica_rpc_read(nid, io, &strips[i]);
                if (unlikely(ret)) {
                        if (ret == EPERM) {
                                ret = EAGAIN;
                                network_close(nid, "replica rpc read fail", &ltime);
                        }

                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __ec_strips_decode(const ec_t *ec, uint32_t off, uint32_t count,
                buffer_t *strips, unsigned char *src_in_err)
{
        int ret;
        uint32_t i, j;
        char *buffs[EC_MMAX] = {0};
        char *buf;

        (void) off;
        for (i = 0; i < ec->m; i++) {
                ret = posix_memalign((void **)&buf, STRIP_ALIGN, STRIP_BLOCK);
                if (ret)
                        GOTO(err_free, ret);

                buffs[i] = buf;
        }

        for (i = 0; i < ec->m; i++) {
                if (src_in_err[i])
                        mbuffer_free(&strips[i]);
        }

        for (i = 0; i < count; i++) {
                for (j = 0; j < ec->m; j++) {
                        if (!src_in_err[j])
                                mbuffer_read(&strips[j], buffs[j], i * STRIP_BLOCK, STRIP_BLOCK);
                }

                ret = ec_decode(src_in_err, &buffs[0], &buffs[ec->k], STRIP_BLOCK, ec->m, ec->k);
                if (ret)
                        GOTO(err_free, ret);

                for(j = 0; j < ec->m; j++) {
                        if (src_in_err[j])
                                mbuffer_copy(&strips[j], buffs[j], STRIP_BLOCK);
                }
        }

        for (i = 0; i < ec->m; i++) {
                if (src_in_err[i])
                        YASSERT(strips[i].len == count * STRIP_BLOCK);
        }

        for (i = 0; i < ec->m; i++) {
                if (buffs[i])
                        free(buffs[i]);
        }
        return 0;
err_free:
        for (i = 0; i < ec->m; i++) {
                if (buffs[i])
                        free(buffs[i]);
        }
        return ret;
}

STATIC int __ec_strips_recovery(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                const clockstat_t *clocks, const io_t *_io,
                const ec_t *ec, uint32_t off, uint32_t count,
                buffer_t *strips, unsigned char *src_in_err)
{
        int ret, i, err = 0;
        io_t io;

        io = *_io;
        io.offset = off * STRIP_BLOCK;
        io.size = count * STRIP_BLOCK;

        for (i = 0; i < ec->m; i++) {
                if (src_in_err[i]) {
                        err += 1;
                        continue;
                }

                io.vclock = clocks[i].vclock;
                ret = __ec_replica_read(chkinfo, chkstat, &io, i, strips);
                if (unlikely(ret)) {
                        src_in_err[i] = 1;
                        err += 1;
                        continue;
                }
        }

        if (err > ec->m - ec->k) {
                ret = ENONET;
                GOTO(err_ret, ret);
        } else if (err == 0) {
                goto out;
        }

        ret = __ec_strips_decode(ec, off, count, strips, src_in_err);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = ec->k; i < ec->m; i++) {
                if (src_in_err[i]) {
                        ret = __ec_strips_encode(ec, off, count, strips);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        break;
                }
        }

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __ec_strips_pull(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                const io_t *_io, const ec_t *ec, uint32_t off, uint32_t count,
                buffer_t *strips)
{
        int ret, i, err = 0;
        io_t io;
        unsigned char src_in_err[EC_MMAX] = {0};

        io = *_io;
        io.offset = off * STRIP_BLOCK;
        io.size = count * STRIP_BLOCK;

        for (i = 0; i < ec->k; i++) {
                ret = __ec_replica_read(chkinfo, chkstat, &io, i, strips);
                if (unlikely(ret)) {
                        src_in_err[i] = 1;
                        err += 1;
                        continue;
                }
        }

        if (err > ec->m - ec->k) {
                ret = ENONET;
                GOTO(err_ret, ret);
        } else if (err == 0) {
                goto out;
        }

        for (i = ec->k; i < ec->m; i++) {
                ret = __ec_replica_read(chkinfo, chkstat, &io, i, strips);
                if (unlikely(ret)) {
                        src_in_err[i] = 1;
                        err += 1;
                        continue;
                }
        }

        if (err > ec->m - ec->k) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        ret = __ec_strips_decode(ec, off, count, strips, src_in_err);
        if (unlikely(ret))
                GOTO(err_ret, ret);

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __ec_read_strip(const io_t *io, const ec_t *ec,
                uint32_t off, uint32_t count, buffer_t *strips, buffer_t *buf)
{
        char *tmp = mem_cache_calloc(MEM_CACHE_4K, 1);
        uint32_t i, j, strip_off, buf_off = 0, read_off, read_size;

        for (i = 0; i < count; i++) {
                for (j = 0; j < ec->k; j++) {
                        /*
                         * x:superfluous    v:need read
                         * | .... | .... | .... |  --> off  \
                         * | .... | .... | .... |            |--> count
                         * | .... | curr | .... |           /
                         *        |   `--current strip block
                         *        `--strip_off
                         */

                        strip_off = (off + i) * (ec->k * STRIP_BLOCK) + j * STRIP_BLOCK;
                        read_off = i * STRIP_BLOCK;

                        /* | .... | xxxx | ..vv | */
                        if (strip_off + STRIP_BLOCK <= io->offset) {
                                read_size = 0;
                        /* | .... | xxvv | vv.. | */
                        } else if (strip_off < io->offset && strip_off + STRIP_BLOCK <= io->offset + io->size) {
                                read_off = read_off + (io->offset - strip_off);
                                read_size = STRIP_BLOCK - (io->offset - strip_off);

                                mbuffer_read(&strips[j], tmp, read_off, read_size);
                                mbuffer_copy(buf, tmp, read_size);
                        /* | .... | xvvx | .... | */
                        } else if (strip_off < io->offset && strip_off + STRIP_BLOCK > io->offset + io->size) {
                                read_off = read_off + (io->offset - strip_off);
                                read_size = io->size;

                                mbuffer_read(&strips[j], tmp, read_off, read_size);
                                mbuffer_copy(buf, tmp, read_size);
                        /* | ..vv | vvvv | vv.. | */
                        } else if (strip_off >= io->offset && strip_off + STRIP_BLOCK <= io->offset + io->size) {
                                read_size = STRIP_BLOCK;

                                mbuffer_read(&strips[j], tmp, read_off, read_size);
                                mbuffer_copy(buf, tmp, read_size);
                        /* | ..vv | vvxx | xx.. | */
                        } else if (strip_off >= io->offset && strip_off + STRIP_BLOCK > io->offset + io->size) {
                                read_size = (io->offset + io->size) - strip_off;

                                mbuffer_read(&strips[j], tmp, read_off, read_size);
                                mbuffer_copy(buf, tmp, read_size);
                        /* | vv.. | xxxx | .... | */
                        } else if (strip_off >= io->offset + io->size) {
                                read_size = 0;
                        } else {
                                YASSERT(0);
                        }

                        buf_off += read_size;
                        if (buf_off >= io->size)
                                break;
                }
        }

        YASSERT(buf_off == io->size);
        YASSERT(buf->len == io->size);

        mem_cache_free(MEM_CACHE_4K, tmp);
        return 0;
}

int chunk_proto_ec_read(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm,
                     const io_t *io, buffer_t *buf, const ec_t *ec)
{
        int ret;
        uint32_t off = 0, count = 0;
        buffer_t strips[EC_MMAX];

        if (eclog_chunk_islog(&chkinfo->id, ec)) {
                return chunk_proto_rep_read(pool, chkinfo, chkstat, vfm, io, buf, NULL);
        }
        
        DBUG("chunk pre read. ioinfo chunnkid(id(%llu), type(%u), idx(%u)), clock(%llu), "
                "offset(%llu), size(%u), flags(%u), lsn(%llu), chkinfo chunkdi(id(%llu), "
                "type(%u), idx(%u)), magic(%u), repnum(%u), mtime(%u), infover(%llu), "
                "snapver(%llu), chkstat_clock(%llu), read(%u), write(%u)\n",
                (LLU)io->id.id, io->id.type, io->id.idx, (LLU)io->vclock.clock, (LLU)io->offset,
                io->size, io->flags, (LLU)io->lsn, (LLU)chkinfo->id.id, chkinfo->id.type, 
                chkinfo->id.idx, chkinfo->magic, chkinfo->repnum, chkinfo->mtime, 
                (LLU)chkinfo->info_version, (LLU)chkinfo->snap_version,
                (LLU)chkstat->chkstat_clock, chkstat->read, chkstat->write);

        (void) vfm;
        YASSERT(chkinfo->repnum == ec->m);

        __ec_strips_init(io, ec, &off, &count, strips, FALSE, FALSE);

        ret = __ec_strips_pull(chkinfo, chkstat, io, ec, off, count, strips);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __ec_read_strip(io, ec, off, count, strips, buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        __ec_strips_free(strips);

        return 0;
err_ret:
        __ec_strips_free(strips);
        return ret;
}

STATIC int __ec_strips_encode(const ec_t *ec, uint32_t off, uint32_t count,
                buffer_t *strips)
{
        int ret;
        uint32_t i, j;
        char *buffs[EC_MMAX] = {0};
        char *buf;

        DBUG("ec strips encode. ecinfo m(%u), plugin(%u), k(%u), tech(%u), off(%u), "
                "count(%u)\n", ec->m, ec->plugin, ec->k, ec->tech, off, count);

        (void) off;
        for (i = 0; i < ec->m; i++) {
                ret = posix_memalign((void **)&buf, STRIP_ALIGN, STRIP_BLOCK);
                if (ret)
                        GOTO(err_free, ret);

                buffs[i] = buf;
        }

        for (i = ec->k; i < ec->m; i++) {
                mbuffer_free(&strips[i]);
        }

        for (i = 0; i < count; i++) {
                for (j = 0; j < ec->k; j++) {
                        mbuffer_read(&strips[j], buffs[j], i * STRIP_BLOCK, STRIP_BLOCK);
                }

                ret = ec_encode(&buffs[0], &buffs[ec->k], STRIP_BLOCK, ec->m, ec->k);
                if (ret)
                        GOTO(err_free, ret);

                for(j = ec->k; j < ec->m; j++) {
                        mbuffer_copy(&strips[j], buffs[j], STRIP_BLOCK);
                }
        }

        for (i = ec->k; i < ec->m; i++) {
                YASSERT(strips[i].len == count * STRIP_BLOCK);
        }

        for (i = 0; i < ec->m; i++) {
                if (buffs[i])
                        free(buffs[i]);
        }
        return 0;
err_free:
        for (i = 0; i < ec->m; i++) {
                if (buffs[i])
                        free(buffs[i]);
        }
        return ret;
}

STATIC int __ec_replica_write(chunk_local_write_ctx_t *ctx,
                const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                io_t *io, uint32_t i, buffer_t *strips)
{
        int ret;
        const reploc_t *tmp;
        const repstat_t *repstat;
        const nid_t *nid;
        time_t ltime;

        tmp = &chkinfo->diskid[i];
        repstat = &chkstat->repstat[i];

        if (unlikely(repstat->ltime == 0)) {
                DBUG("chunk "CHKID_FORMAT"@ %s not connected\n",
                                CHKID_ARG(&chkinfo->id), network_rname(&tmp->id));
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        if (unlikely(tmp->status != 0)) {
                DBUG("chunk "CHKID_FORMAT"@ %s status %u\n",
                                CHKID_ARG(&chkinfo->id), network_rname(&tmp->id),
                                tmp->status);
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        nid = &tmp->id;

        ret = network_connect(nid, &ltime, 1, 0);
        if (unlikely(ret)) {
                if (net_islocal(nid)) {
                        UNIMPLEMENTED(__DUMP__);
                } else {
                        GOTO(err_ret, ret);
                }
        }

        if (net_islocal(nid)) {
                YASSERT(ctx != NULL);
                DBUG("write "CHKID_FORMAT" %u %llu clock %ld local\n",
                     CHKID_ARG(&io->id), io->size, (LLU)io->offset, io->vclock.clock);
                chunk_local_write(ctx, io, &strips[i], repstat->magic, ltime);
        } else {
                YASSERT(ctx == NULL);
                DBUG("write "CHKID_FORMAT" %u %llu clock %ld remote @ %s\n",
                     CHKID_ARG(&io->id), io->size, (LLU)io->offset, io->vclock.clock, network_rname(nid));
                ret = replica_rpc_write(nid, io, &strips[i], repstat->magic);
                if (unlikely(ret)) {
                        network_close(nid, "replica rpc write fail", &ltime);
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __ec_write_strip(const io_t *io, const ec_t *ec,
                uint32_t off, uint32_t count, buffer_t *strips, const buffer_t *buf)
{
        int ret;
        char *tmp = mem_cache_calloc(MEM_CACHE_4K, 1);
        uint32_t i, j, strip_off, buf_off = 0, write_off, write_size;

        for (i = 0; i < count; i++) {
                for (j = 0; j < ec->k; j++) {
                        /*
                         * x:superfluous    v:need write
                         * | .... | .... | .... |  --> off  \
                         * | .... | .... | .... |            |--> count
                         * | .... | curr | .... |           /
                         *        |   `--current strip block
                         *        `--strip_off
                         */

                        strip_off = (off + i) * (ec->k * STRIP_BLOCK) + j * STRIP_BLOCK;
                        write_off = i * STRIP_BLOCK;

                        /* | .... | xxxx | ..vv | */
                        if (strip_off + STRIP_BLOCK <= io->offset) {
                                write_size = 0;
                        /* | .... | xxvv | vv.. | */
                        } else if (strip_off < io->offset && strip_off + STRIP_BLOCK <= io->offset + io->size) {
                                write_off = write_off + (io->offset - strip_off);
                                write_size = STRIP_BLOCK - (io->offset - strip_off);

                                mbuffer_read(buf, tmp, buf_off, write_size);
                                mbuffer_write(&strips[j], tmp, write_off, write_size);
                        /* | .... | xvvx | .... | */
                        } else if (strip_off < io->offset && strip_off + STRIP_BLOCK > io->offset + io->size) {
                                write_off = write_off + (io->offset - strip_off);
                                write_size = io->size;

                                mbuffer_read(buf, tmp, buf_off, write_size);
                                mbuffer_write(&strips[j], tmp, write_off, write_size);
                        /* | ..vv | vvvv | vv.. | */
                        } else if (strip_off >= io->offset && strip_off + STRIP_BLOCK <= io->offset + io->size) {
                                write_size = STRIP_BLOCK;

                                mbuffer_read(buf, tmp, buf_off, write_size);
                                mbuffer_write(&strips[j], tmp, write_off, write_size);
                        /* | ..vv | vvxx | xx.. | */
                        } else if (strip_off >= io->offset && strip_off + STRIP_BLOCK > io->offset + io->size) {
                                write_size = (io->offset + io->size) - strip_off;

                                mbuffer_read(buf, tmp, buf_off, write_size);
                                mbuffer_write(&strips[j], tmp, write_off, write_size);
                        /* | vv.. | xxxx | .... | */
                        } else if (strip_off >= io->offset + io->size) {
                                write_size = 0;
                        } else {
                                YASSERT(0);
                        }

                        buf_off += write_size;
                        if (buf_off >= io->size)
                                break;
                }
        }

        YASSERT(buf_off == io->size);

        mem_cache_free(MEM_CACHE_4K, tmp);

        ret = __ec_strips_encode(ec, off, count, strips);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __ec_write_commit(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                const io_t *_io, const ec_t *ec, uint32_t off, uint32_t count,
                buffer_t *strips)
{
        int ret, i, success = 0, local = 0;
        const nid_t *nid;
        chunk_local_write_ctx_t ctx;
        io_t io;

        io = *_io;
        io.offset = off * STRIP_BLOCK;
        io.size = count * STRIP_BLOCK;

        for (i = 0; i < ec->m; i++) {
                nid = &chkinfo->diskid[i].id;
                if (net_islocal(nid)) {
                        ret = __ec_replica_write(&ctx, chkinfo, chkstat, &io, i, strips);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        local = 1;
                        break;
                }
        }

        for (i = 0; i < ec->m; i++) {
                nid = &chkinfo->diskid[i].id;
                if (!net_islocal(nid)) {
                        ret = __ec_replica_write(NULL, chkinfo, chkstat, &io, i, strips);
                        if (unlikely(ret))
                                continue;

                        success++;
                }
        }

        if (local) {
                ret = chunk_local_write_wait(&ctx);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                success++;
        }

        if (unlikely(success != ec->m)) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int chunk_proto_ec_write(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm,
                      const io_t *_io, const buffer_t *buf, const ec_t *ec)
{
        int ret;
        io_t io;
        uint32_t off = 0, count = 0;
        buffer_t strips[EC_MMAX];

        if (eclog_chunk_islog(&chkinfo->id, ec)) {
                return chunk_proto_rep_read(pool, chkinfo, chkstat, vfm, io, buf, NULL);
        }
        
        (void) vfm;
        YASSERT(chkinfo->repnum == ec->m);

        __ec_strips_init(_io, ec, &off, &count, strips, FALSE, FALSE);

        io = *_io;
        io.vclock.clock--;

        ret = __ec_strips_pull(chkinfo, chkstat, &io, ec, off, count, strips);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __ec_write_strip(_io, ec, off, count, strips, buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __ec_write_commit(chkinfo, chkstat, _io, ec, off, count, strips);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        __ec_strips_free(strips);

        return 0;
err_ret:
        __ec_strips_free(strips);
        return ret;
}

void chunk_proto_ec_strips_init(const io_t *io, const ec_t *ec,
                uint32_t *off, uint32_t *count, buffer_t *strips)
{
        __ec_strips_init(io, ec, off, count, strips, FALSE, FALSE);
}

void chunk_proto_ec_strips_free(buffer_t *strips)
{
        __ec_strips_free(strips);
}

int chunk_proto_ec_write_pull(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                const io_t *io, const ec_t *ec, uint32_t off, uint32_t count, buffer_t *strips)
{
        int ret;

        YASSERT(chkinfo->repnum == ec->m);

        ret = __ec_strips_pull(chkinfo, chkstat, io, ec, off, count, strips);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int chunk_proto_ec_write_strip(const io_t *io, const buffer_t *buf, const ec_t *ec,
                uint32_t off, uint32_t count, buffer_t *strips)
{
        int ret;

        ret = __ec_write_strip(io, ec, off, count, strips, buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int chunk_proto_ec_write_commit(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                      const io_t *io, const ec_t *ec, uint32_t off, uint32_t count, buffer_t *strips)
{
        int ret;

        YASSERT(chkinfo->repnum == ec->m);

        ret = __ec_write_commit(chkinfo, chkstat, io, ec, off, count, strips);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int chunk_proto_ec_create(const char *pool, const chkid_t *chkid, int chknum,
                       const nid_t *nids, int repnum, const fileid_t *parent,
                       const nid_t *parentnid, int tier, int initzero, uint64_t info_version,
                       const buffer_t *buf, const ec_t *ec)
{
        int ret, i;
        const nid_t *nid;
        io_t io;
        uint32_t off = 0, count = 0;
        buffer_t strips[EC_MMAX];

        if (eclog_chunk_islog(chkid, ec)) {
                return chunk_proto_rep_create(pool, chkid, chknum, nids, repnum,
                                              parent, parentnid, tier, initzero,
                                              info_version, buf, NULL);
        }
        
        YASSERT(tier == -1 || tier == 0 || tier == 1);

        YASSERT(chknum == 1);
        YASSERT(parent);
        YASSERT(parent->type == __VOLUME_CHUNK__);
        YASSERT(chkid_cmp(parent, chkid));
        YASSERT(chkid->id);
        if (eclog_chunk_islog(chkid, ec)) {
                YASSERT(repnum == ec->m - ec->k + 1);
        } else {
                YASSERT(repnum == ec->m);
        }

        DBUG("create chunk "CHKID_FORMAT" @ %s/"CHKID_FORMAT"\n",
             CHKID_ARG(chkid), network_rname(parentnid), CHKID_ARG(parent));

        if (buf) {
                io.offset = 0;
                io.size = buf->len;
                __ec_strips_init(&io, ec, &off, &count, strips, TRUE, initzero);

                ret = __ec_write_strip(&io, ec, off, count, strips, buf);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                for (i = 0; i < (int)repnum; i++) {
                        nid = &nids[i];
                        if (net_islocal(nid)) {
                                ret = replica_srv_create(pool, nid, chkid, 1, parent, tier, initzero, &strips[i],
                                                         info_version, 0);
                                if (unlikely(ret))
                                        GOTO(err_free, ret);
                        } else {
                                ret = replica_rpc_create(pool, nid, chkid, 1, parent, tier, initzero, &strips[i],
                                                         info_version, 0);
                                if (unlikely(ret))
                                        GOTO(err_free, ret);
                        }
                }

                __ec_strips_free(strips);
        } else {
                for (i = 0; i < (int)repnum; i++) {
                        nid = &nids[i];
                        if (net_islocal(nid)) {
                                ret = replica_srv_create(pool, nid, chkid, 1, parent, tier, initzero, buf,
                                                         info_version, 0);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
                        } else {
                                ret = replica_rpc_create(pool, nid, chkid, 1, parent, tier, initzero, buf,
                                                         info_version, 0);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
                        }
                }
        }

        return 0;
err_free:
        __ec_strips_free(strips);
err_ret:
        return ret;
}

STATIC int __chunk_ec_get_clean1(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                const fileid_t *parent, clockstat_t *clockstat, clockstat_t *clocks, const ec_t *ec)
{
        int ret, i, found = 0;
        const reploc_t *reploc;
        time_t ltime;
        clockstat_t tmp = {{0, 0}, 0};
        repstat_t repstat;

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                reploc = &chkinfo->diskid[i];
                if (reploc->status)
                        continue;

                ret = network_connect(&reploc->id, &ltime, 1, 0);
                if (unlikely(ret))
                        continue;

                if (likely(chkstat->repstat[i].ltime == ltime)) {
                        ret = chunk_getclock(&reploc->id, &chkinfo->id, clockstat);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else {
                        ret = chunk_connect(&reploc->id, &chkinfo->id, parent, 0,
                                              clockstat, &repstat);
                        if (unlikely(ret)) {
                                if ((ret == ENOENT) || (ret == EIO || ret == ENODEV)) {
                                        DBUG(CHKID_FORMAT" connect %s fail\n", CHKID_ARG(&chkinfo->id),
                                             network_rname(&reploc->id));
                                        continue;
                                } else
                                        GOTO(err_ret, ret);
                        }
                }

                if (found == 0 || clockstat->vclock.clock > tmp.vclock.clock)
                        tmp.vclock = clockstat->vclock;
                if (clockstat->dirty)
                        tmp.dirty = clockstat->dirty;

                clocks[i] = *clockstat;
                found++;
        }

        *clockstat = tmp;

        if (unlikely(found < ec->k)) {
                ret = ENONET;
                CHKINFO_DUMP(chkinfo, D_INFO);
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_get_clean2(const chkinfo_t *chkinfo, chkstat_t *chkstat,
                const fileid_t *parent, clockstat_t *clockstat, clockstat_t *clocks, ec_t *ec, int force)
{
        int ret, i, found = 0;
        const reploc_t *reploc;
        time_t ltime;
        clockstat_t tmp = {{0, 0}, 0};
        repstat_t repstat;
        int dirty_flag=1;
        (void) force;

dirty_retry:
        for (i = 0; i < (int)chkinfo->repnum; i++) {
                reploc = &chkinfo->diskid[i];
                if (reploc->status)
                        continue;

                ret = network_connect(&reploc->id, &ltime, 1, 0);
                if (unlikely(ret))
                        continue;

                ret = chunk_connect(&reploc->id, &chkinfo->id, parent, 0,
                                      clockstat, &repstat);
                if (unlikely(ret)) {
                        if ((ret == ENOENT) || (ret == EIO || ret == ENODEV)) {
                                DBUG(CHKID_FORMAT" connect %s fail\n", CHKID_ARG(&chkinfo->id),
                                     network_rname(&reploc->id));
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                if (dirty_flag && clockstat->dirty) {
                        dirty_flag++;
                        continue;
                }

                if (found == 0 || clockstat->vclock.clock > tmp.vclock.clock)
                        tmp.vclock = clockstat->vclock;
                if (clockstat->dirty)
                        tmp.dirty = clockstat->dirty;

                clocks[i] = *clockstat;
                chkstat->repstat[i] = repstat;
                found++;
        }

        *clockstat = tmp;

        if (unlikely(found < ec->k)) {
                if(dirty_flag) {
                        dirty_flag = 0;
                        goto  dirty_retry;
                }
                ret = ENONET;
                CHKINFO_DUMP(chkinfo, D_INFO);
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __chunk_ec_push(args_t *args, const buffer_t *buf, const nid_t *to, const vclock_t *vclock)
{
        return chunk_push_with(args->pool, &args->chkinfo->id, buf, to, vclock, args->chkinfo->info_version,
                          &args->parent, args->tier, args->flags);
}

static int __chunk_ec_push_newdisk(args_t *args, const buffer_t *buf, int _to, const vclock_t *vclock)
{
        return chunk_push_newdisk_with(args->pool, args->chkinfo, buf, _to, vclock, &args->parent,
                                  args->tier, args->flags);
}

STATIC int __chunk_ec_isonline(const diskid_t *diskid, time_t *ltime)
{
        int ret;

        ret =  network_connect(diskid, ltime, 1, 0);
        if (unlikely(ret))
                return 0;
        else
                return 1;
}

STATIC int __chunk_ec_equal(const chkid_t *chkid,
                         const clockstat_t *clockstat1,
                         const nid_t *nid2, const clockstat_t *clockstat2)
{
        if ((clockstat1->dirty || clockstat2->dirty)
            || (clockstat1->vclock.clock != clockstat2->vclock.clock)
            || (clockstat1->vclock.vfm != clockstat2->vclock.vfm)) {

                DINFO("check "CHKID_FORMAT" vclock %llu, %u --> %s:%llu, %u\n",
                      CHKID_ARG(chkid),
                      (LLU)clockstat1->vclock.clock, clockstat1->dirty,
                      network_rname(nid2), (LLU)clockstat2->vclock.clock, clockstat2->dirty);

                return 0;
        } else
                return 1;
}

STATIC int __chunk_ec_exist(const nid_t *arr, int count, const nid_t *nid)
{
        int i;

        for (i = 0; i < count; i++) {
                if (nid_cmp(&arr[i], nid) == 0) {
                        return i;
                }
        }

        return -1;
}

STATIC int __chunk_ec_clean_online(args_t *args, buffer_t *buf, int _to,
                                const clockstat_t *clockstat, repstat_t *_repstat)
{
        int ret, forcesync, retry = 0;
        clockstat_t clockstat2;
        const nid_t *to;
        repstat_t repstat;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;
        const chkid_t *chkid = &chkinfo->id;

        (void) chkstat;

        to = &chkinfo->diskid[_to].id;

retry:
        forcesync = 0;
        ret = chunk_connect(to, chkid, &args->parent, 0, &clockstat2, &repstat);
        if (unlikely(ret)) {
                if ((ret == EIO || ret == ENODEV) && retry == 0) {
                        forcesync = 1;
                } else
                        GOTO(err_ret, ret);
        }

        if (forcesync || !__chunk_ec_equal(chkid, clockstat, to, &clockstat2)) {
                ret = __chunk_ec_push(args, buf, to, &clockstat->vclock);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (forcesync) {
                        if (retry) {
                                DWARN(""CHKID_FORMAT" retry %u\n", CHKID_ARG(&chkinfo->id), retry);
                        }

                        retry++;
                        goto retry;
                }
        }

        *_repstat = repstat;

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_clean_offline(args_t *args, buffer_t *buf, int _to,
                                 const clockstat_t *clockstat, int force)
{
        int ret;
        const nid_t *to;
        repstat_t repstat;
        clockstat_t clockstat2;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;

        to = &chkinfo->diskid[_to].id;
        if (args->op == __OP_WRITE || args->op == __OP_READ) {
                DINFO(""CHKID_FORMAT" @ %s need sync immediately\n", CHKID_ARG(&chkinfo->id),
                      network_rname(to));
                ret = __chunk_ec_push_newdisk(args, buf, _to, &clockstat->vclock);
                if (unlikely(ret)) {
                        if (ret == ENOSPC) {
                                if (chkid_isvol(&chkinfo->id) && chkinfo->repnum <= gloconf.force_write_repnum) {
                                        DWARN(""CHKID_FORMAT" @ %s need space\n", CHKID_ARG(&chkinfo->id),
                                              network_rname(to));
                                        ret = EAGAIN;
                                        GOTO(err_ret, ret);
                                } else {
                                        DWARN(""CHKID_FORMAT" @ %s set dirty, force:%d\n", CHKID_ARG(&chkinfo->id),
                                              network_rname(to), force);
                                        chkinfo_set_status(chkinfo, _to, __S_DIRTY);
                                        if (force == 0) {
                                                chunk_bh_sync(&args->parent, &chkinfo->id, 0);
                                                goto out;
                                        }
                                }
                        } else
                                GOTO(err_ret, ret);
                }

                ret = chunk_connect(to, &chkinfo->id, &args->parent, 0, &clockstat2, &repstat);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                chkstat->repstat[_to] = repstat;
                chkinfo_set_status(chkinfo, _to, 0);
                YASSERT(chkinfo->diskid[_to].status == __S_CLEAN);
        } else  {
                DBUG(""CHKID_FORMAT" noop\n", CHKID_ARG(&chkinfo->id));
        }

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_clean(args_t *args, buffer_t *buf, int _to,
                         const clockstat_t *clockstat, int force)
{
        int ret;
        time_t ltime;
        const nid_t *to;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;

        YASSERT(args->op == __OP_NOOP || args->op == __OP_WRITE || args->op == __OP_READ);

        to = &chkinfo->diskid[_to].id;

        DBUG("check chunk "CHKID_FORMAT" @ %s\n", CHKID_ARG(&chkinfo->id),
              network_rname(to));

        if (__chunk_ec_isonline(to, &ltime)) {
                if (chkstat->repstat[_to].ltime != ltime) {
                        ret = __chunk_ec_clean_online(args, buf, _to,
                                                   clockstat, &chkstat->repstat[_to]);
                        if (unlikely(ret)) {
                                if (ret == ENOSPC) {
                                        ret = __chunk_ec_clean_offline(args, buf, _to,
                                                                    clockstat, force);
                                        if (unlikely(ret))
                                                GOTO(err_ret, ret);
                                } else
                                        GOTO(err_ret, ret);
                        }
                } else {
                        DBUG("chunk "CHKID_FORMAT" @ %s\n", CHKID_ARG(&chkinfo->id),
                              network_rname(to));
                }
        } else {
                ret = __chunk_ec_clean_offline(args, buf, _to,
                                            clockstat, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_dirty_online(args_t *args, buffer_t *buf, int _to,
                                const clockstat_t *clockstat1)
{
        int ret, forcesync = 0;
        clockstat_t clockstat2;
        const nid_t *to;
        repstat_t repstat;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;
        const chkid_t *chkid = &chkinfo->id;

        (void) forcesync;
        to = &chkinfo->diskid[_to].id;

        ret = chunk_connect(to, chkid, &args->parent, 0, &clockstat2, &repstat);
        if (unlikely(ret)) {
                if (ret == EIO) {
                        forcesync = 1;
                } else
                        GOTO(err_ret, ret);
        }

        ret = __chunk_ec_push(args, buf, to, &clockstat1->vclock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO(""CHKID_FORMAT" %s sync\n", CHKID_ARG(chkid),
              network_rname(to));

        chkinfo_set_status(chkinfo, _to, 0);
        chkstat->repstat[_to] = repstat;

        CHKINFO_DUMP(chkinfo, D_INFO);
        DINFO("reset "CHKID_FORMAT" @ %s clock %llu\n",
              CHKID_ARG(&chkinfo->id), network_rname(to), (LLU)clockstat1->vclock.clock);

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_dirty_offline(args_t *args, buffer_t *buf, int _to,
                                 const clockstat_t *clockstat, int force)
{
        int ret;
        const nid_t *to;
        repstat_t repstat;
        clockstat_t clockstat2;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;

        /*YASSERT(!chkid_isvol(&chkinfo->id));*/

        if (force == 0) {
                chunk_bh_sync(&args->parent, &chkinfo->id, 0);
                goto out;
        }

        to = &chkinfo->diskid[_to].id;

        DINFO(""CHKID_FORMAT" need sync immediately\n", CHKID_ARG(&chkinfo->id));
        ret = __chunk_ec_push_newdisk(args, buf, _to, &clockstat->vclock);
        if (unlikely(ret)) {
                if (ret == ENOSPC) {
                        DWARN(""CHKID_FORMAT" @ %s sync fail, force:%d\n", CHKID_ARG(&chkinfo->id),
                              network_rname(to), force);
                        //chkinfo_set_status(chkinfo, _to, __S_DIRTY);
                        if (force == 0) {
                                chunk_bh_sync(&args->parent, &chkinfo->id, 0);
                                goto out;
                        } else
                                GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        }

        ret = chunk_connect(to, &chkinfo->id, &args->parent, 0, &clockstat2, &repstat);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        chkstat->repstat[_to] = repstat;
        chkinfo_set_status(chkinfo, _to, 0);
        YASSERT(chkinfo->diskid[_to].status == __S_CLEAN);

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_dirty(args_t *args, buffer_t *buf, int _to,
                                const clockstat_t *clockstat, int force)
{
        int ret;
        time_t ltime;
        const nid_t *to;
        chkinfo_t *chkinfo = args->chkinfo;

        YASSERT(args->op == __OP_NOOP || args->op == __OP_WRITE || args->op == __OP_READ);

        to = &chkinfo->diskid[_to].id;

        DINFO("check chunk "CHKID_FORMAT" @ %s\n", CHKID_ARG(&chkinfo->id),
              network_rname(to));

        if (__chunk_ec_isonline(to, &ltime)) {
                ret = __chunk_ec_dirty_online(args, buf, _to, clockstat);
                if (unlikely(ret)) {
                        if (ret == ENOSPC) {
                                ret = __chunk_ec_dirty_offline(args, buf, _to, clockstat, force);
                                if (unlikely(ret)) {
                                        if (ret == ENOSPC) {
                                                DWARN("sync chunk "CHKID_FORMAT" @ %s fail\n",
                                                      CHKID_ARG(&chkinfo->id),
                                                      network_rname(to));
                                                GOTO(err_ret, ret);
                                        } else
                                                GOTO(err_ret, ret);
                                }
                        } else
                                GOTO(err_ret, ret);
                }
        } else {
                ret = __chunk_ec_dirty_offline(args, buf, _to, clockstat, force);
                if (unlikely(ret)) {
                        if (ret == ENOSPC) {
                                DWARN("sync chunk "CHKID_FORMAT" @ %s fail\n",
                                      CHKID_ARG(&chkinfo->id),
                                      network_rname(to));
                                GOTO(err_ret, ret);
                        } else
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_needcheck_online(args_t *args, buffer_t *buf, int _to,
                                    const clockstat_t *clockstat1)
{
        int ret, forcesync, retry = 0;
        clockstat_t clockstat2;
        const nid_t *to;
        repstat_t repstat;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;
        const chkid_t *chkid = &chkinfo->id;

        to = &chkinfo->diskid[_to].id;

retry:
        forcesync = 0;
        ret = chunk_connect(to, chkid, &args->parent, 0, &clockstat2, &repstat);
        if (unlikely(ret)) {
                if ((ret == EIO || ret == ENODEV) && retry == 0) {
                        forcesync = 1;
                } else
                        GOTO(err_ret, ret);
        }

        if (forcesync || !__chunk_ec_equal(chkid, clockstat1, to, &clockstat2)) {
                ret = __chunk_ec_push(args, buf, to, &clockstat1->vclock);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (forcesync) {
                        if (retry) {
                                DWARN(""CHKID_FORMAT" retry %u\n", CHKID_ARG(&chkinfo->id), retry);
                        }

                        retry++;
                        goto retry;
                }
        }


        chkinfo_set_status(chkinfo, _to, 0);
        chkstat->repstat[_to] = repstat;


        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_needcheck_offline(args_t *args, buffer_t *buf, int _to,
                                      const clockstat_t *clockstat, int force)
{
        int ret;
        const nid_t *to;
        repstat_t repstat;
        clockstat_t clockstat2;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;

        to = &chkinfo->diskid[_to].id;

        if (args->op == __OP_WRITE) {
                DINFO(""CHKID_FORMAT" need sync immediately\n", CHKID_ARG(&chkinfo->id));
                ret = __chunk_ec_push_newdisk(args, buf, _to, &clockstat->vclock);
                if (unlikely(ret)) {
                        if (ret == ENOSPC) {
                                DWARN(""CHKID_FORMAT" @ %s set dirty, force:%d\n", CHKID_ARG(&chkinfo->id),
                                      network_rname(to), force);
                                chkinfo_set_status(chkinfo, _to, __S_DIRTY);
                                if (force == 0) {
                                        chunk_bh_sync(&args->parent, &chkinfo->id, 0);
                                        goto out;
                                } else
                                        GOTO(err_ret, ret);
                        } else
                                GOTO(err_ret, ret);
                }

                ret = chunk_connect(to, &chkinfo->id, &args->parent, 0, &clockstat2, &repstat);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                chkstat->repstat[_to] = repstat;
                chkinfo_set_status(chkinfo, _to, 0);
                YASSERT(chkinfo->diskid[_to].status == __S_CLEAN);

#if 0
                ret = network_connect(to, &chkstat->repstat[_to].ltime, 1, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
#endif
        } else  {
                DBUG(""CHKID_FORMAT" noop\n", CHKID_ARG(&chkinfo->id));
        }

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_needcheck(args_t *args, buffer_t *buf, int _to,
                const clockstat_t *clockstat, int force)
{
        int ret;
        time_t ltime;
        const nid_t *to;
        chkinfo_t *chkinfo = args->chkinfo;

        YASSERT(args->op == __OP_NOOP || args->op == __OP_WRITE || args->op == __OP_READ);

        to = &chkinfo->diskid[_to].id;

        if (__chunk_ec_isonline(to, &ltime)) {
                ret = __chunk_ec_needcheck_online(args, buf, _to, clockstat);
                if (unlikely(ret)) {
                        if (ret == ENOSPC) {
                                ret = __chunk_ec_needcheck_offline(args,
                                                                buf, _to, clockstat, force);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
                        } else
                                GOTO(err_ret, ret);
                }
        } else {
                ret = __chunk_ec_needcheck_offline(args,
                                                buf, _to, clockstat, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_check1__(args_t *args, const clockstat_t *clockstat,
                            int to, buffer_t *buf, int force)
{
        int ret;
        reploc_t *reploc;
        chkinfo_t *chkinfo = args->chkinfo;

        reploc = &chkinfo->diskid[to];
        if (likely(reploc->status == __S_CLEAN)) {
                ret = __chunk_ec_clean(args, buf, to, clockstat, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (reploc->status == __S_DIRTY) {
                ret = __chunk_ec_dirty(args, buf, to, clockstat, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (reploc->status == __S_CHECK) {
                ret = __chunk_ec_needcheck(args, buf, to, clockstat, force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                UNIMPLEMENTED(__DUMP__);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_check1(args_t *args, const clockstat_t *clockstat,
                const clockstat_t *clocks, unsigned char *src_in_err, int force)
{
        int ret, i;
        io_t io;
        uint32_t off = 0, count = 0;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;
        buffer_t strips[EC_MMAX];

        io_init(&io, &chkinfo->id, &clockstat->vclock, 0, args->ec.k * LICH_CHUNK_SPLIT, 0);
        __ec_strips_init(&io, &args->ec, &off, &count, strips, FALSE, FALSE);

        ret = __ec_strips_recovery(chkinfo, chkstat, clocks, &io,
                        &args->ec, off, count, strips, src_in_err);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (!src_in_err[i])
                        continue;

                ret = __chunk_ec_check1__(args, clockstat, i, &strips[i], force);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        __ec_strips_free(strips);
        return 0;
err_ret:
        __ec_strips_free(strips);
        return ret;
}

STATIC int __ec_strips_redo(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                const clockstat_t *clocks,
                const ec_t *ec, uint32_t off, uint32_t count,
                buffer_t *strips, unsigned char *src_in_err)
{
        int ret, i, err = 0, success = 0, local = 0;
        const nid_t *nid;
        chunk_local_write_ctx_t ctx;
        io_t io;

        io.offset = off * STRIP_BLOCK;
        io.size = count * STRIP_BLOCK;

        for (i = 0; i < ec->m; i++) {
                if (src_in_err[i]) {
                        err += 1;
                        continue;
                }

                io.vclock = clocks[i].vclock;
                nid = &chkinfo->diskid[i].id;
                if (net_islocal(nid)) {
                        ret = __ec_replica_write(&ctx, chkinfo, chkstat, &io, i, strips);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        local = 1;
                        break;
                }
        }

        for (i = 0; i < ec->m; i++) {
                if (src_in_err[i]) {
                        err += 1;
                        continue;
                }

                io.vclock = clocks[i].vclock;
                nid = &chkinfo->diskid[i].id;
                if (!net_islocal(nid)) {
                        ret = __ec_replica_write(NULL, chkinfo, chkstat, &io, i, strips);
                        if (unlikely(ret))
                                continue;

                        success++;
                }
        }

        if (local) {
                ret = chunk_local_write_wait(&ctx);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                success++;
        }

        if (unlikely(success != ec->m - err)) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_redo(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                const clockstat_t *clocks, const ec_t *ec, uint32_t off, uint32_t count,
                buffer_t *buf, unsigned char *src_in_err)
{
        int ret, i;
        buffer_t strips[EC_MMAX];

        YASSERT(buf->len == STRIP_BLOCK * count * ec->m);
        for (i = 0; i < ec->m; i++) {
                mbuffer_init(&strips[i], 0);
                mbuffer_pop(buf, &strips[i], STRIP_BLOCK * count);
        }

        ret = __ec_strips_redo(chkinfo, chkstat, clocks,
                        ec, off, count, strips, src_in_err);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        __ec_strips_free(strips);
        return 0;
err_ret:
        __ec_strips_free(strips);
        return ret;
}

int chunk_proto_ec_redo(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                const clockstat_t *clocks, const ec_t *ec, uint32_t off, uint32_t count,
                buffer_t *buf, unsigned char *src_in_err)
{
        return __chunk_ec_redo(chkinfo, chkstat, clocks,
                        ec, off, count, buf, src_in_err);
}

STATIC int __chunk_ec_check__(args_t *args, const clockstat_t *clockstat, clockstat_t *clocks, int *_redo)
{
        int ret, i, err = 0, flag = 0, redo = 0, consistent;
        uint64_t clock;
        reploc_t *reploc;
        repstat_t *repstat;
        const nid_t *nid;
        clockstat_t _clockstat;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;
        (void) _clockstat;
        (void) nid;
        (void) clockstat;

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                reploc = &chkinfo->diskid[i];
                repstat = &chkstat->repstat[i];

                consistent = chunk_replica_consistent(&chkid->id, reploc, repstat->ltime, args->vfm);
                if (likely(consistent))
                        continue;

                args->src_in_err[i] = 1;
                err += 1;
        }

        if (err > args->ec.m - args->ec.k) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                if (args->src_in_err[i])
                        continue;

                if (flag == 0) {
                        clock = clocks[i].vclock.clock;
                        flag = 1;
                } else if (clock != clocks[i].vclock.clock) {
                        DINFO("chunk "CHKID_FORMAT" need redo\n", CHKID_ARG(&chkinfo->id));
                        redo = 1;
                        break;
                }
        }

        if (_redo) {
                *_redo = redo;
                goto out;
        } else {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_check_fully(args_t *args, int *redo)
{
        int ret;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;

        DBUG("check chunk "CHKID_FORMAT"  fully\n", CHKID_ARG(&chkinfo->id));

        ret = __chunk_ec_get_clean2(chkinfo, chkstat, &args->parent, args->clockstat, args->clocks, &args->ec, 1);
        if (unlikely(ret)) {
            GOTO(err_ret, ret);
        }

        chkstat->chkstat_clock = args->clockstat->vclock.clock;
        chkstat->magic = chunk_magic();

        ret = __chunk_ec_check__(args, args->clockstat, args->clocks, redo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_check_partly(args_t *args, int *redo)
{
        int ret;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;

        DBUG("check chunk "CHKID_FORMAT"  partly\n", CHKID_ARG(&chkinfo->id));

        ret = __chunk_ec_get_clean1(chkinfo, chkstat, &args->parent, args->clockstat, args->clocks, &args->ec);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (unlikely(args->clockstat->dirty != 0)) {
                DINFO("chunk "CHKID_FORMAT" in writing clock %llu --> %llu\n",
                      CHKID_ARG(&chkinfo->id), (LLU)args->clockstat->vclock.clock,
                      (LLU)chkstat->chkstat_clock);

                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        if (unlikely(args->clockstat->vclock.clock != chkstat->chkstat_clock)) {
                DINFO("chunk "CHKID_FORMAT" in writing clock %llu --> %llu\n",
                      CHKID_ARG(&chkinfo->id), (LLU)args->clockstat->vclock.clock,
                      (LLU)chkstat->chkstat_clock);

                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        args->clockstat->vclock.clock = chkstat->chkstat_clock;
        args->clockstat->dirty = 0;

        ret = __chunk_ec_check__(args, args->clockstat, args->clocks, redo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_connect_force(const chkid_t *parent, const chkinfo_t *chkinfo, chkstat_t *chkstat)
{
        int ret, i;
        const nid_t *nid;
        clockstat_t clockstat;
        repstat_t repstat;

        (void) parent;
        (void) clockstat;

        DINFO("force connect "CHKID_FORMAT"\n", CHKID_ARG(&chkinfo->id));

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                nid = &chkinfo->diskid[i].id;
                ret = chunk_connect(nid, &chkinfo->id, parent, 1, &clockstat, &repstat);
                if (unlikely(ret)) {
                        DWARN("connect "CHKID_FORMAT" @%s ret %u %s\n",
                              CHKID_ARG(&chkinfo->id), network_rname(nid), ret, strerror(ret));
                        continue;
                }

                repstat.ltime = 0;
                chkstat->repstat[i] = repstat;
        }

        return 0;
}

STATIC int __chunk_proto_ec_check(args_t *args, int *redo)
{
        int ret, connected;

        connected = chunk_proto_connected(args->chkinfo, args->chkstat, NULL);
        if (connected == 0) {
                // 卷加载后，chunk的各副本ltime = 0，所以会进入此代码分支
                // 从选择出的副本同步clock到chkstat_clock
                ret =  __chunk_ec_check_fully(args, redo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = __chunk_ec_check_partly(args, redo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int chunk_proto_ec_check(const char *pool, chkinfo_t *chkinfo, chkstat_t *chkstat, vfm_t *vfm,
                const fileid_t *parent, int op, int force, int tier, int flags, const ec_t *ec)
{
        int ret, retry = 0;
        clockstat_t clockstat;
        clockstat_t clocks[EC_MMAX];
        unsigned char src_in_err[EC_MMAX] = {0};
        args_t args;

        if (eclog_chunk_islog(&chkinfo->id, ec)) {
                return chunk_proto_rep_check(pool, chkinfo, chkstat, vfm,
                                             parent, op, force, tier, flags, NULL);
        }
        
        args_init(&args, pool, parent, chkinfo, chkstat, vfm, op, tier, flags, ec, &clockstat, clocks, src_in_err);

        if (likely(chunk_proto_consistent(chkinfo, chkstat, NULL))) {
                return 0;
        }

        DBUG("check "CHKID_FORMAT"\n", CHKID_ARG(&chkinfo->id));

retry:
        ret = __chunk_proto_ec_check(&args, NULL);
        if (unlikely(ret)) {
                if (((ret == EIO) || ((ret == EPERM) && force)) && (retry == 0)) {
                        __chunk_ec_connect_force(&args.parent, chkinfo, chkstat);
                        retry++;
                        goto retry;
                } else {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

int chunk_proto_ec_check_needredo(const char *pool, chkinfo_t *chkinfo, chkstat_t *chkstat, const fileid_t *parent,
                      int op, int force, int tier, int flags, const ec_t *ec,
                      clockstat_t *clockstat, clockstat_t *clocks, unsigned char *src_in_err,
                      int *redo, int *recovery)
{
        int ret, retry = 0;
        args_t args;

        args_init(&args, pool, parent, chkinfo, chkstat, NULL, op, tier, flags, ec, clockstat, clocks, src_in_err);

        if (likely(chunk_proto_consistent(chkinfo, chkstat, NULL))) {
                *recovery = 0;
                return 0;
        }

        *recovery = 1;
        DBUG("check "CHKID_FORMAT"\n", CHKID_ARG(&chkinfo->id));

retry:
        ret = __chunk_proto_ec_check(&args, redo);
        if (unlikely(ret)) {
                if (((ret == EIO) || ((ret == EPERM) && force)) && (retry == 0)) {
                        __chunk_ec_connect_force(&args.parent, chkinfo, chkstat);
                        retry++;
                        goto retry;
                } else {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_needsetclock(const ec_t *ec, clockstat_t *clocks, unsigned char *src_in_err)
{
        int i, found = 0;
        uint64_t clock;

        for (i = 0; i < ec->m; i++) {
                if (src_in_err[i])
                        continue;

                if (found == 0) {
                        clock = clocks[i].vclock.clock;
                        found = 1;
                } else if (clocks[i].vclock.clock != clock) {
                        return 1;
                }
        }

        return 0;
}

int chunk_proto_ec_check_recovery(const char *pool, chkinfo_t *chkinfo, chkstat_t *chkstat, const fileid_t *parent,
                      int op, int tier, int flags, const ec_t *ec,
                      clockstat_t *clockstat, clockstat_t *clocks, unsigned char *src_in_err)
{
        int ret, i;
        args_t args;
        const nid_t *nid;
        clockstat_t _clockstat;

        args_init(&args, pool, parent, chkinfo, chkstat, NULL, op, tier, flags, ec, clockstat, clocks, src_in_err);

        ret = __chunk_ec_check1(&args, clockstat, clocks, src_in_err, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (unlikely(__chunk_ec_needsetclock(ec, clocks, src_in_err))) {
                for (i = 0; i < (int)chkinfo->repnum; i++) {
                        nid = &chkinfo->diskid[i].id;
                        _clockstat = *clockstat;
                        _clockstat.dirty = 0;
                        if (net_islocal(nid)) {
                                ret = replica_srv_setclock(nid, &chkinfo->id, &_clockstat);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
                        } else {
                                ret = replica_rpc_setclock(nid, &chkinfo->id, &_clockstat);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
                        }
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_sync(args_t *args, const clockstat_t *clockstat, const clockstat_t *clocks)
{
        int ret, i, err = 0;
        reploc_t *reploc;
        repstat_t *repstat;
        chkinfo_t *chkinfo = args->chkinfo;
        chkstat_t *chkstat = args->chkstat;
        unsigned char src_in_err[EC_MMAX] = {0};

        for (i = 0; i < (int)chkinfo->repnum; i++) {
                reploc = &chkinfo->diskid[i];
                repstat = &chkstat->repstat[i];

                if (chunk_replica_intact(&chkinfo->id, reploc, repstat->ltime, NULL))
                        continue;

                src_in_err[i] = 1;
                err += 1;
        }

        if (err > args->ec.m - args->ec.k) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        ret = __chunk_ec_check1(args, clockstat, clocks, src_in_err, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int chunk_proto_ec_sync(const char *pool, chkinfo_t *chkinfo, chkstat_t *chkstat, vfm_t *vfm,
                const fileid_t *parent, int force, int tier, int flags, const ec_t *ec)
{
        int ret;
        clockstat_t clockstat;
        clockstat_t clocks[EC_MMAX];
        args_t args;
 
        if (eclog_chunk_islog(&chkinfo->id, ec)) {
                return chunk_proto_rep_sync(pool, chkinfo, chkstat, vfm,
                                            parent, op, force, tier, flags, NULL);
        }
       
        if (chunk_proto_intact(chkinfo, chkstat)) {
                return 0;
        }

        ANALYSIS_BEGIN(0);

        CHKINFO_DUMP(chkinfo, D_INFO);

        (void) force;
        /*
        ret = chunk_proto_ec_check(chkinfo, chkstat, parent, __OP_WRITE, force, tier, flags, ec);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        */

        ret = __chunk_ec_get_clean1(chkinfo, chkstat, parent, &clockstat, clocks, ec);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        //YASSERT(clockstat.dirty == 0);
        if (unlikely(clockstat.dirty != 0)) {
                DINFO("chunk "CHKID_FORMAT" in writing clock %llu --> %llu\n",
                      CHKID_ARG(&chkinfo->id), (LLU)clockstat.vclock.clock,
                      (LLU)chkstat->chkstat_clock);

                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        if (clockstat.vclock.clock != chkstat->chkstat_clock) {
                DINFO("chunk "CHKID_FORMAT" in writing clock %llu --> %llu\n",
                      CHKID_ARG(&chkinfo->id), (LLU)clockstat.vclock.clock,
                      (LLU)chkstat->chkstat_clock);
                YASSERT(clockstat.vclock.clock < chkstat->chkstat_clock);
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        args_init(&args, pool, parent, chkinfo, chkstat, vfm, __OP_WRITE, tier, flags, NULL, NULL, NULL, NULL);

        ret = __chunk_ec_sync(&args, &clockstat, clocks);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(0, 1000 * 100, NULL);

        return 0;
err_ret:
        return ret;
}

STATIC int __chunk_ec_move(const char *pool, chkinfo_t *_chkinfo, chkstat_t *_chkstat, int _i, const chkid_t *parent,
                const nid_t *nid, int i, chkinfo_t *chkinfo, chkstat_t *chkstat, int tier, int flags)
{
        int ret;
        clockstat_t clockstat, clockstat2;
        repstat_t repstat;

        if (_chkinfo->diskid[_i].status) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        ret = chunk_getclock(&_chkinfo->diskid[_i].id, &_chkinfo->id, &clockstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (clockstat.vclock.clock != _chkstat->chkstat_clock) {
                DINFO("chunk "CHKID_FORMAT" in writing clock %llu --> %llu\n",
                      CHKID_ARG(&_chkinfo->id), (LLU)clockstat.vclock.clock,
                      (LLU)_chkstat->chkstat_clock);

                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        ret = chunk_push(pool, &_chkinfo->id, &_chkinfo->diskid[_i].id,
                         &nid[i], &clockstat.vclock,
                         chkinfo->info_version + 1, parent, tier, flags);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = chunk_connect(&nid[i], &_chkinfo->id, parent, 0, &clockstat2, &repstat);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chkstat->repstat[i] = repstat;

        chkinfo->diskid[i].id = nid[i];
        chkinfo->diskid[i].status = __S_CLEAN;

        return 0;
err_ret:
        return ret;
}

int chunk_proto_ec_move(const char *pool, chkinfo_t *_chkinfo, chkstat_t *_chkstat, const chkid_t *parent,
                     const nid_t *nid, int count, int tier, int flags, const ec_t *ec)
{
        int ret, i, j, cleanup;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkinfo_[CHKINFO_MAX], _chkstat_[CHKSTAT_MAX];

        if (eclog_chunk_islog(&_chkinfo->id, ec)) {
                return chunk_proto_rep_move(pool, _chkinfo, _chkstat,
                                            parent, nid, count, tier, flags, NULL);
        }
       
        
        (void) pool;
        (void) ec;
        YASSERT(count == _chkinfo->repnum);

        if (unlikely(!chunk_proto_consistent(_chkinfo, _chkstat, NULL))) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        chkinfo = (void *)_chkinfo_;
        chkstat = (void *)_chkstat_;

        *chkinfo = *_chkinfo;
        *chkstat = *_chkstat;

        for (i = 0; i < count; i++) {
                if (nid_cmp(&nid[i], &_chkinfo->diskid[i].id) == 0) {
                        chkinfo->diskid[i] = _chkinfo->diskid[i];
                        chkstat->repstat[i] = _chkstat->repstat[i];
                } else {
                        ret = __chunk_ec_exist(nid, count, &_chkinfo->diskid[i].id);
                        if (ret != -1) {
                                ret = EAGAIN;
                                GOTO(err_ret, ret);
                        }

                        ret = __chunk_ec_move(pool, _chkinfo, _chkstat, i, parent,
                                        nid, i, chkinfo, chkstat, tier, flags);
                        if (unlikely(ret)) {
                                GOTO(err_ret, ret);
                        }
                }
        }

        for (i = 0; i < _chkinfo->repnum; i++) {
                cleanup = 1;
                for (j = 0; j < count; j++) {
                        if (nid_cmp(&_chkinfo->diskid[i].id, &nid[j]) == 0) {
                                cleanup = 0;
                                break;
                        }
                }

                if (cleanup) {
                        ret = chunk_cleanup_push(pool, parent, &_chkinfo->id,
                                                 &_chkinfo->diskid[i].id,
                                                 chkinfo->info_version);
                }
        }

        chkinfo->repnum = count;
        chkinfo->info_version++;
        CHKINFO_CP(_chkinfo, chkinfo);
        CHKSTAT_CP(_chkstat, chkstat, chkinfo->repnum);

        return 0;
err_ret:
        return ret;
}

struct chunk_proto_ops ec_ops = {
        .name           = "chunk_proto_ec",
        .create         = chunk_proto_ec_create,
        .unlink         = chunk_proto_rep_unlink,
        .sha1           = NULL,
        .check          = chunk_proto_ec_check,
        .sync           = chunk_proto_ec_sync,
        .move           = chunk_proto_ec_move,
        .read           = chunk_proto_ec_read,
        .write          = chunk_proto_ec_write,
};

int chunk_proto_ec_init()
{
        return chunk_proto_ops_register(&ec_ops, CHUNK_PROTO_EC);
}
